package javaCSV;

import java.util.stream.Stream;

public class SendMessageApplication {

    public static void main(String[] args) throws Exception {
        // 文件地址
        String filePath = "E:\\workspace\\develop\\javaCodes\\flink-kafka-master\\javaCSV\\javaCSV\\UserBehavior.csv";
        // kafka topic
        String topic = "userBehavior";
        // kafka borker地 址
        String broker = "192.168.43.112:9092";

        Stream.generate(new UserBehaviorCsvFileReader(filePath))
                .sequential()
                .forEachOrdered(new KafkaProducer(topic, broker));

        /*
        bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 1 --topic userBehavior
        bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic userBehavior
         */

    }
}
